package com.ld.shieldsb.canalclient.handler.impl.solr;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import javax.sql.DataSource;

import org.apache.solr.client.solrj.SolrClient;

import com.ld.shieldsb.canalclient.etl.AbstractEtlService;
import com.ld.shieldsb.canalclient.etl.EtlConsumer;
import com.ld.shieldsb.canalclient.handler.config.AdapterConfig.AdapterMapping;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.util.CanalUtil;
import com.ld.shieldsb.canalclient.util.SyncUtil;
import com.ld.shieldsb.common.composition.util.ConvertUtil.MapUtil;
import com.ld.shieldsb.common.core.util.solr.SolrUtil;

/**
 * RDB ETL 操作业务类
 *
 * @author rewerma @ 2018-11-7
 * @version 1.0.0
 */
/**
 * Solr ETL操作业务类
 * 
 * @ClassName SolrEtlService
 * @author <a href="mailto:donggongai@126.com" target="_blank">吕凯</a>
 * @date 2022年1月14日 下午2:54:05
 *
 */
public class SolrEtlService extends AbstractEtlService {

    private SolrClient solrClient;

    public SolrEtlService(SolrClient solrClient, MappingConfig config) {
        super("Solr", config);
        this.solrClient = solrClient;
        this.config = config;
    }

    /**
     * 执行导入，先删除旧数据再插入新数据
     */
    @Override
    protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> values, long cnt, AdapterMapping mapping,
            AtomicLong impCount, List<String> errMsg, Consumer<EtlConsumer> con) {
        boolean result = false;
        try {
            DbMapping dbMapping = (DbMapping) mapping;
            // 列映射，目标为key，源字段为value，保留大小写
            Map<String, String> columnsMap = new LinkedHashMap<>();
            // 列类型，key忽略大小写
            Map<String, String> columnType = new LinkedHashMap<>();

            // 所有列名
            List<String> columns = new ArrayList<>();
            SolrUtil.getFieldInfo(solrClient).forEach((key, field) -> {
                columnType.put(key, field.getType());
                columns.add(key);
            });
            columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));

            if (MapUtil.isEmpty(columnsMap)) {
                errMsg.add(" 不能获取Solr的字段！");
            } else {
                // 执行查询
                CanalUtil.sqlRS(srcDS, sql, values, rs -> {
                    int idx = 1;

                    try {
                        boolean completed = false;

                        Map<String, Object> docValues = new LinkedHashMap<>();

                        // TODO 设置不自动提交事务，最后统一提交，貌似不支持，待调整

                        while (rs.next()) {
                            completed = false;

                            // 删除旧数据

                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
                                String targetColumnName = entry.getKey();
                                String srcColumnName = entry.getValue();
                                if (srcColumnName == null) {
                                    srcColumnName = CanalUtil.cleanColumn(targetColumnName);
                                }
                                String ctypeKey = SyncUtil.getIgnoreCaseColunm(columnType, targetColumnName);
                                String type = columnType.get(ctypeKey);
                                if (type != null) { // 目标列存在
                                    Object value;
                                    try {
                                        value = rs.getObject(srcColumnName);
                                        docValues.put(ctypeKey, value);
                                    } catch (Exception e) {
                                        if (logger.isTraceEnabled()) {
                                            logger.trace("", e);
                                        }
                                    }
                                }
                            }

                            // 提交数据，注意不是提交事务
                            SolrUtil.add(solrClient, docValues, false); // docValues中的数据覆盖

                            if (idx % dbMapping.getCommitBatch() == 0) { //
                                SolrUtil.commit(solrClient); // 提交事务
                                completed = true; // 完成
                            }
                            idx++;
                            impCount.incrementAndGet();
                            if (logger.isDebugEnabled()) {
                                logger.debug("successful import count:" + impCount.get());
                            }
                        }
                        // 循环完后提交事务
                        if (!completed) {
                            SolrUtil.commit(solrClient); // 提交事务

                        }

                    } catch (Exception e) {
                        String msg = e.getMessage();
                        logger.error(dbMapping.getTable() + " etl 失败! ==>" + msg, e);
                        errMsg.add(dbMapping.getTable() + " etl 失败! ==>" + msg);
                    }
                    return idx;
                });
                result = true;
            }

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return false;
        } finally {
            con.accept(EtlConsumer.builder().processState(EtlConsumer.PROCESS_STATE_END).srcDS(srcDS).sql(sql).values(values)
                    .mapping(mapping).impCount(impCount).errMsg(errMsg).success(result).count(cnt).build());
        }
        return result;
    }

}
